package com.csw;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo07HBaseToHBaseMapReduce {
    public static class ReadHBaseMapper extends TableMapper<Text,IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            byte[] bytes = key.get();
            String rowkey = Bytes.toString(bytes);
            byte[] bytes1 = value.getValue("info".getBytes(), "clazz".getBytes());
            String clazz = Bytes.toString(bytes1);

            context.write(new Text(clazz),new IntWritable(1));
        }
    }
    public static class WriteHBaseReducer extends TableReducer<Text,IntWritable,NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            String clazz = key.toString();
            int sum=0;
            for (IntWritable value : values) {
                sum+=value.get();
            }
            Put put = new Put(clazz.getBytes());
            put.addColumn("info".getBytes(),"clazz_num".getBytes(),(sum+"").getBytes());

            context.write(NullWritable.get(),put);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();

        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");

        //创建Job实例
        Job job = Job.getInstance(conf);
        job.setJarByClass(Demo07HBaseToHBaseMapReduce.class);
        job.setJobName("Demo07HBaseToHBaseMapReduce");

        Scan scan = new Scan();

        // 配置Mapper
        //create 'clazz_num','info'
        TableMapReduceUtil.initTableMapperJob(
                TableName.valueOf("students"),
                scan,
                ReadHBaseMapper.class,
                Text.class,
                IntWritable.class,
                job
        );

        //配置Reducer
        TableMapReduceUtil.initTableReducerJob(
                "clazz_num",
                WriteHBaseReducer.class,
                job
        );

        job.waitForCompletion(true);

        /**先建表
         * create 'clazz_num','info'
         *
         * hadoop jar HBaseAPI-1.0-jar-with-dependencies.jar com.csw.Demo07HBaseToHBaseMapReduce
         */
    }
}
